MongoDB学习(三)一一MongoDB MapReduce

MongoDB MapReduce

MongoDB 固定集合

MapReduce一般都是在集合中操作,所以我们先来看MongoDB 固定集合(Capped Collections),它是性能出色且有着固定大小的集合,对于大小固定,我们可以想象其就像一个环形队列,当集合空间用完后,再插入的元素就会覆盖最初始的头部的元素!

创建固定集合

1
>db.createCollection("cappedLogCollection",{capped:true,size:10000})

还可以指定文档个数,加上max:1000属性:

1
>db.createCollection("cappedLogCollection",{capped:true,size:10000,max:1000})

判断是否为固定集合

1
>db.cappedLogCollection.isCapped()

转换为固定集合

1
2
3
4
> db.runCommand({"convertToCapped":"posts", size:10000})
{ "ok" : 1 }
> db.posts.isCapped()
true

固定集合查询

固定集合文档按照插入顺序储存的,默认情况下查询就是按照插入顺序返回的,也可以使用$natural调整返回顺序。

1
> db.posts.find().sort({$natural:-1})

固定集合的功能特点

可以插入及更新,但更新不能超出collection的大小,否则更新失败,不允许删除,但是可以调用drop()删除集合中的所有行,但是drop后需要显式地重建集合。
在32位机子上一个cappped collection的最大值约为482.5M,64位上只受系统文件大小的限制。

固定集合属性及用法

属性
  • 对固定集合进行插入速度极快

  • 按照插入顺序的查询输出速度极快

  • 能够在插入最新数据时,淘汰最早的数据

用法
  • 储存日志信息

  • 缓存一些少量的文档

MapReduce

说道MapReduce,就不得不提大名鼎鼎的Google的论文MapReduce: Simplified Data Processing on Large Clusters,我直接把他上传到七牛云了,因为谷歌的网站需要科学上网比较麻烦。

如果你看了这篇论文,相信你一定能知道map/reduce的概念了。

有时间一定写一篇博客仔细研读一下(这是个坑)

挖了这个坑,还不满足,Google还有 GFS BigTable Chubby 这些大名鼎鼎的论文呢。

简单的介绍一下:

Map函数

Map就是映射,接受一个键值对(key-value pair),产生一组中间键值对。MapReduce框架会将map函数产生的中间键值对里键相同的值传递给一个reduce函数。

伪代码:

1
2
3
4
5
6
ClassMapper
methodmap(String input_key, String input_value):
// input_key: text document name
// input_value: document contents
for eachword w ininput_value:
EmitIntermediate(w, "1");

Reduce函数

接受一个键,以及相关的一组值,将这组值进行合并产生一组规模更小的值(通常只有一个或零个值)。

伪代码:

1
2
3
4
5
6
7
8
ClassReducer
method reduce(String output_key,Iterator intermediate_values):
// output_key: a word
// output_values: a list of counts
intresult = 0;
for each v in intermediate_values:
result += ParseInt(v);
Emit(AsString(result));

在统计词频的例子里,map函数接受的键是文件名,值是文件的内容,map逐个遍历单词,每遇到一个单词w,就产生一个中间键值对< w, “1”>,这表示单词w又找到了一个;MapReduce将键相同(都是单词w)的键值对传给reduce函数,这样reduce函数接受的键就是单词w,值是一串”1”(最基本的实现是这样,但可以优化),个数等于键为w的键值对的个数,然后将这些“1”累加就得到单词w的出现次数。最后这些单词的出现次数会被写到用户定义的位置,存储在底层的分布式存储系统(GFS或HDFS)


函数式编程

在python的函数式编程中,内建了map()和reduce()函数。也可以说MapReduce的主要思想,都是从函数式编程语言里借来的。之前我也又看到一个比较好玩的代码分享一下,我还发给过我的程序员高中同学当生日祝福→_→

1
print map(lambda x:"Happy Birthday to " + ("you" if x != 2 else "name"), range(4))

运行的结果就是这样啦:

1
['Happy Birthday to you', 'Happy Birthday to you', 'Happy Birthday to name', 'Happy Birthday to you']

MapReduce C++实现

啊说道MapReduce,我又想起了之前写的C++实现,show you the code:

wordcount_map.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
//map是读取信息,然后输出键值对
//对于Wordcount来说,map只需要把每个单词分别输出,后面再输出个1就行,表示每个单词出现了1次
#include <iostream>
#include <string>
using namespace std;
int main(int argc, char** argv) {
string word;
while(cin >> word) {
cout << word << "/t" << "1" << endl;
}
return 0;
}

wordcount_reduce.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//读取map输出的键值对,并把key值相同的键值做整合,并输出整合结果
#include <iostream>
#include <string>
#include <map>
using namespace std;
int main(int argc, char** argv) {
string key, num;
map<string, int> count;
map<string, int>::iterator it;
while(cin >> key >> num) {
it = count.find(key);
if(it != count.end()) {
it->second++;
} else {
count.insert(make_pair(key, 1));
}
}
for(it = count.begin(); it != count.end(); it++) {
cout << it->first << "/t" << it->second << endl;
}
return 0;
}

MongoDB MapReduce

绕了一大圈终于要说到我们的主题了,哈哈这也是我的风格,喜欢一下子联想超级多的东西,不然也是,学技术的就应该这样把所有东西都串起来,串成烧烤多好吃啊…

MapReduce 命令

以下是MapReduce的基本语法:

1
2
3
4
5
6
7
8
9
10
>db.collection.mapReduce(
function() {emit(key,value);}, //map 函数
function(key,values) {return reduceFunction}, //reduce 函数
{
out: collection,
query: document,
sort: document,
limit: number
}
)

使用 MapReduce 要实现两个函数 Map 函数和 Reduce 函数,Map 函数调用 emit(key, value), 遍历 collection 中所有的记录, 将 key 与 value 传递给 Reduce 函数进行处理。
Map 函数必须调用 emit(key, value) 返回键值对。

参数说明:

  • map:映射函数 (生成键值对序列,作为 reduce 函数参数)

  • reduce:统计函数,reduce函数的任务就是将key-values变成key-value,也就是把values数组变成一个单一的值value

  • out: 统计结果存放集合 (不指定则使用临时集合,在客户端断开后自动删除)

  • query: 一个筛选条件,只有满足条件的文档才会调用map函数。(query。limit,sort可以随意组合)

  • sort: 和limit结合的sort排序参数(也是在发往map函数前给文档排序),可以优化分组机制

  • limit: 发往map函数的文档数量的上限(要是没有limit,单独使用sort的用处不大)

MapReduce 使用

先插入我们的数据,伪装成大数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
> db.posts.insert({
... "post_text": "kelele67 is shuaibi",
... "user_name": "67",
... "status":"active"
... })
WriteResult({ "nInserted" : 1 })
> db.posts.insert({
... "post_text": "kelele67 is shuaibi",
... "user_name": "67",
... "status":"active"
... })
WriteResult({ "nInserted" : 1 })
> db.posts.insert({
... "post_text": "kelele67 is shuaibi",
... "user_name": "67",
... "status":"active"
... })
WriteResult({ "nInserted" : 1 })
> db.posts.insert({
... "post_text": "kelele67 is shuaibi",
... "user_name": "67",
... "status":"active"
... })
WriteResult({ "nInserted" : 1 })
> db.posts.insert({
... "post_text": "kelele67 is shuaibi",
... "user_name": "67",
... "status":"disabled"
... })
WriteResult({ "nInserted" : 1 })
> db.posts.insert({
... "post_text": "kelele67 is shuaibi",
... "user_name": "liu",
... "status":"disabled"
... })
WriteResult({ "nInserted" : 1 })
> db.posts.insert({
... "post_text": "kelele67 is shuaibi",
... "user_name": "liu",
... "status":"disabled"
... })
WriteResult({ "nInserted" : 1 })
> db.posts.insert({
... "post_text": "kelele67 is shuaibi",
... "user_name": "liu",
... "status":"active"
... })
WriteResult({ "nInserted" : 1 })

然后再posts集合中使用MapReduce函数来选取已发布的文章(status:”active”),并通过user_name分组,计算每个用户的文章数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
> db.posts.mapReduce(
... function() {emit(this.user_name, 1);},
... function(key, values) {return Array.sum(values)},
... {
... query:{status:"active"},
... out:"post_total"
... }
... )
{
"result" : "post_total",
"timeMillis" : 288,
"counts" : {
"input" : 5,
"emit" : 5,
"reduce" : 1,
"output" : 2
},
"ok" : 1
}

结果表明,共有 5 个符合查询条件(status:”active”)的文档, 在map函数中生成了 5 个键值对文档,最后使用reduce函数将相同的键值分为 2 组。

具体参数说明:

  • result:储存结果的collection的名字,这是个临时集合,MapReduce的连接关闭后自动就被删除了

  • timeMillis:执行花费的时间,毫秒为单位

  • input:满足条件被发送到map函数的文档个数

  • emit:在map函数中emit被调用的次数,也就是所有集合中的数据总量

  • ouput:结果集合中的文档个数(count对调试非常有帮助)

  • ok:是否成功,成功为1

  • err:如果失败,这里可以有失败原因,不过从经验上来看,原因比较模糊,作用不大

使用find

1
2
3
4
5
6
7
8
9
10
> db.posts.mapReduce(
... function() {emit(this.user_name, 1);},
... function(key, values) {return Array.sum(values)},
... {
... query:{status:"active"},
... out:"post_total"
... }
... ).find()
{ "_id" : "67", "value" : 4 }
{ "_id" : "liu", "value" : 1 }

查询结果显示,两个用户 67 和 liu 分别有4篇和1篇active的文章

总结

用类似的方式,MapReduce可以被用来构建大型复杂的聚合查询。
Map函数和Reduce函数可以使用 JavaScript 来实现,使得MapReduce的使用非常灵活和强大。

MapReduce 技巧

  • 对结果进行数据类型转化

利用Finalize函数(该函数是在Reduce函数后调用,它将对所有key的Reduce结果进行最后的操作),例如我在后台调用了api后想得到的是int型数据,而不是double的,那么就可以添加Finalize函数:

1
2
3
4
5
6
...,
finalize : function Finalize(key, reduced) {
reduced.count = NumberInt(reduced.count);
return reduced;
},
...

这样,输出的reduced将会是int32,在后台你就直接用一个强制转化就行了,而不需要先从object转为double,再转为int(用ToString后再用Prase也不如强制转化)。

  • 时间类型

因为mongodb是有Date类型的,但是由于存入的时间格式和查询时间的格式可能不一致(特别是在你的mongodb部署在远程,而开发又是多人协作),会导致根据时间条件,却查不出数据的问题。我的建议,直接存时间的long形态(过去秒数),那么这种差异性问题就不复存在。

MapReduce 注意的地方

小tips:mongoDB中建立的索引,优先使用固定的,而不要使用范围。

Reduce时的计数问题

这个问题主要出现在使用“+1”的思路去计算累计次数时。如果在Map后的某一类中,记录量过大,就会导致计数失败

我们先准备好我们的数据,插入400条相同的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
> db.runCommand({ mapreduce: "stu",
...map : function Map() {
... emit(
... {grade:this.grade},
... {recnum:1,score:this.score}
... );
...},
...reduce : function Reduce(key, values) {
... var reduced = {recnum:0,score:0};
... values.forEach(function(val){
... reduced.score += val.score;
... ++reduced.recnum;
... });
... return reduced;
...},
...finalize : function Finalize(key, reduced) {
... return reduced;
...},
...out : { inline : 1 }
...});
{
"results" : [
{
"_id" : {
"grade" : 3
},
"value" : {
"recnum" : 101,
"score" : 38000
}
}
],
"timeMillis" : 107,
"counts" : {
"input" : 400,
"emit" : 400,
"reduce" : 4,
"output" : 1
},
"ok" : 1
}

可以看到value.recnum会输出的是101并不是400,而value.scorce却是输出的正确的:38000(95*400)

并且通过更改reduce函数: function Reduce(key, values) { return {test:values}; } ,发现数据是这样的

在原本Reduce函数中的forEach只遍历了第一层的数据,即101个,所以++操作也只做了101次!   

导致问题的原因关键就在于MapReduce中emit后的Bosn的数据格式,一个大于100的Array,会被拆分存储,变成了非线性的链表结构

最后,这里给出计数的替代方案,修改Reduce的++,改用+=操作:

Reduce时的提取数据问题

这个问题产生的原因与上面的相似,也是由于emit后的数据在reduce时是非线性的(有层次关系),所以提取数据字段时也会产生问题,为了测试,往上面所说的表中再插入3条数据:

1
2
3
4
5
6
> db.stu.insert({ "grade" : 3, "name" : "liu", "score" : 95 })
WriteResult({ "nInserted" : 1 })
> db.stu.insert({ "grade" : 2, "name" : "qi", "score" : 95 })
WriteResult({ "nInserted" : 1 })
> db.stu.insert({ "grade" : 2, "name" : "lq", "score" : 95 })
WriteResult({ "nInserted" : 1 })

  编写提取出各个grade的所有人名(不重复)列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
> db.runCommand({ mapreduce: "stu",
... map : function Map() {
... emit(
... {grade:this.grade},
... {name:this.name}
... );
... },
... reduce : function Reduce(key, values) {
... var reduced = {names:[]};
... values.forEach(function(val) {
... var isExist = false;
... for(var i = 0; i<reduced.names.length; i++) {
... var cur = reduced.names[i];
... if(cur==val.name){
... isExist = true;
... break;
... }
... }
... if(!isExist)
... reduced.names.push(val.name);
... });
... return reduced;
... },
... finalize : function Finalize(key, reduced) {
... return reduced;
... },
... out : { inline : 1 }
... });
{
"results" : [
{
"_id" : {
"grade" : 2
},
"value" : {
"names" : [
"qi",
"lq"
]
}
},
{
"_id" : {
"grade" : 3
},
"value" : {
"names" : [
undefined,
"liu"
]
}
}
],
"timeMillis" : 20,
"counts" : {
"input" : 403,
"emit" : 403,
"reduce" : 6,
"output" : 2
},
"ok" : 1
}

新插入的grade=2的两条数据正常了,但grade=3的kelele67却不见了!采用上一个问题的思维方式,肯定也是在Reduce时遍历到一个数组对象,其name值为空,也给添加进来了,kelele67对象根本就没有访问到。   

解决这一问题的方法是,抛弃MapReduce,改用Group:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
> db.stu.group({
... key : {"grade":true},
... initial : {names:[]},
... reduce : function Reduce(val, out) {
... var isExist = false;
... for(var i = 0; i<out.names.length; i++) {
... var cur = out.names[i];
... if(cur==val.name){
... isExist = true;
... break;
... }
... }
... if(!isExist)
... out.names.push(val.name);
... },
... finalize : function Finalize(out) {
... return out;
... }});
[
{
"grade" : 3,
"names" : [
"kelele67",
"liu"
]
},
{
"grade" : 2,
"names" : [
"qi",
"lq"
]
}
]

这样,便可正常取到grade=3时的name非重复集合!虽说MapReduce比Group要强大,速度也要快很多,但像这种要从大量项(超过100条)中提取数据,就有很大风险了。所以,使用MapReduce时,尽量只用到累加、累减、累乘等基本操作,不要去用++、push、delete等可能会产生风险的操作!

更多tips

  • 使用Group或MapReduce时,如果一个分类只有一个元素,那么Reduce函数将不会执行,但Finalize函数还是会执行的。这时你要在Finalize函数中考虑一个元素与多个元素返回结果的一致性(比如,你把问题二中插入一个grade=3的数据看看,执行返回的grade=3时还有names集合吗?)。

  • 查找范围时的索引效率,如果查询的是一个值的范围,它索引的优先级是很低的。比如一个表test,有海量元素,字段有’committime’、’author’,建立了两个索引:author_1、committime:-1,author:1,下面的测试证明了效率:

1
2
    db.test.find({'committime':{'$gt':910713600000,'$lte':1410192000000},'author':'lekko'}).hint({committime:-1,author:1}).explain()   "millis" : 49163
    db.test.find({'committime':{'$gt':910713600000,'$lte':1410192000000},'author':'lekko'}).explain() author_1              "millis" : 2641

参考

在MongoDB的MapReduce上踩过的坑